# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.

<%
# Define the default inputs to use and a list of valid aliases
defined_inputs = configured_inputs(["kafka"], {"eventbroker" => "kafka", "smartconnector" => "tcp"})

alias_settings_keys!(
    {
        "var.input.kafka" => "var.input.eventbroker",
        "var.input.tcp"   => "var.input.smartconnector"
    })
require 'arcsight_module_config_helper'
%>

input {
  <% if defined_inputs.include?("kafka") %>
  kafka {
    codec => cef
    bootstrap_servers => <%= csv_string(get_setting(LogStash::Setting::SplittableStringArray.new("var.input.kafka.bootstrap_servers", String, "localhost:39092"))) %>
    topics => <%= array_to_string(get_setting(LogStash::Setting::SplittableStringArray.new("var.input.kafka.topics", String, ["eb-cef"]))) %>
    <%= LogStash::Arcsight::ConfigHelper.kafka_input_ssl_sasl_config(self) %>
    type => syslog
  }
  <% end %>

  <% if defined_inputs.include?("tcp") %>
  tcp {
    # The delimiter config used is for TCP interpretation
    codec => cef { delimiter => "\r\n" }
    port => <%= setting("var.input.tcp.port", 5000) %>
    <%= LogStash::Arcsight::ConfigHelper.tcp_input_ssl_config(self) %>
    type => syslog
  }
  <% end %>
}

filter {

  # Map the @timestamp with the event time, as recorded in deviceReceiptTime

  date {
    match => [ "deviceReceiptTime", "MMM dd yyyy HH:mm:ss", "MMM  d yyyy HH:mm:ss", "UNIX_MS" ]
  }

  # To map the attacker Geo IP if plausible

  geoip {
    source => "sourceAddress"
    target => "source"
  }

  # To map the target Geo IP if plausible

  geoip {
    source => "destinationAddress"
    target => "destination"
  }

  # To map the log producing device Geo IP if plausible

  geoip {
    source => "deviceAddress"
    target => "device"
  }
}

output {
  <%= elasticsearch_output_config('syslog') %>
}
